package day03.sink;

import beans.SensorReading;
import day02.source.FlinkSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

/**
 * Flink 流处理 API - sink
 * <p>
 * 将数据输出到 kafka 中
 *
 * @author lvbingbing
 * @date 2021-12-19 20:43
 */
public class FlinkSink00 {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2、从kafka数据源读取数据
        DataStream<String> dataStream = readFromKafka(env);
        // 3、将从kafka读取到的数据输出到kafka中
        studyWriteToKafka(dataStream);
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 从kafka数据源读取数据
     *
     * @param env 可执行环境
     * @return <br>
     * @see FlinkSource#readFromKafka() 从kafka数据源读取数据
     * @see StringDeserializer key.deserializer / value.deserializer
     * @see OffsetResetStrategy#LATEST  auto.offset.reset
     */
    private static DataStream<String> readFromKafka(StreamExecutionEnvironment env) {
        Properties properties = new Properties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 从kafka数据源读取数据
        String topic = "sensor";
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties));
        return dataStream.map(line -> {
            String[] fields = line.split(",");
            SensorReading sensorReading = new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            return sensorReading.toString();
        });
    }

    /**
     * 学习 将数据输出到 kafka 中
     *
     * @param dataStream <br>
     */
    private static void studyWriteToKafka(DataStream<String> dataStream) {
        String brokerList = "hadoop102:9092";
        String topicId = "sinkTest";
        dataStream.addSink(new FlinkKafkaProducer011<>(brokerList, topicId, new SimpleStringSchema()));
    }
}
